package com.hao.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class GroupAggregation {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String creatDDL = "CREATE TABLE clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT," +
                "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间  FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(creatDDL);


        //聚合查询转换
        //1.分组聚合
        Table aggTable = tableEnv.sqlQuery("select user_name,count(1) from clickTable group by user_name");

        //2.分组窗口聚合(老版本)
        String sql = "select " +
                "  user_name,count(1) as cnt, " +
                "  TUMBLE_END(et,INTERVAL '5' SECOND) AS endT " +
                " from clickTable " +
                " group by " +
                "  user_name," +
                "  TUMBLE_END(et,INTERVAL '5' SECOND)" ;
        /**
         * 如下异常
         * Exception in thread "main" org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: TUMBLE_END(TIMESTAMP(3), INTERVAL SECOND(3) NOT NULL)
         * If you think this function should be supported, you can create an issue and start a discussion for it.
         */
        Table groupWindowResult = tableEnv.sqlQuery(sql);

        tableEnv.toChangelogStream(aggTable).print("agg:");
        //报异常
        tableEnv.toChangelogStream(groupWindowResult).print("group window:");

        env.execute();
    }
}
